package com.j.lemon.learn.pulsar;

import com.alibaba.fastjson.JSONObject;
import org.apache.pulsar.client.api.*;

/**
 * @Author lijunjun
 * @Date 2020/9/14 11:25
 * @Description
 */
public class ConsumerShared3 {

    public static void main(String[] args) throws PulsarClientException {
        PulsarClient pulsarClient = PulsarUtils.getClient();
        Consumer<byte[]> sharedConsumer = pulsarClient.newConsumer()
                .subscriptionType(SubscriptionType.Shared)
                .subscriptionName("name_test208055")
                .consumerName("cd")
                .topic("persistent://4a/audit/gather_jdbc_count_topic_zzy55")
                .subscribe();
        int i = 0;
        while (true){
            Message<byte[]> receive = sharedConsumer.receive();
            String s = new String(receive.getData());
            JSONObject jsonObject = JSONObject.parseObject(s);
            String gatherCount = jsonObject.getString("GATHER_COUNT");
            int integer = Integer.parseInt(gatherCount);
            i+=integer;
            System.out.println("receive message: " + new String(receive.getData()));
            System.out.println("累加数量："+i);
//            break;
//            sharedConsumer.acknowledge(receive);
        }


    }
}
